package cn.tnar.parkservice.task;

import cn.tnar.parkservice.config.Constant;
import cn.tnar.parkservice.model.entity.*;
import cn.tnar.parkservice.service.ITCarFeesroleinfoService;
import cn.tnar.parkservice.service.ITParkInfoService;
import cn.tnar.parkservice.service.ITParkMemberGroupService;
import cn.tnar.parkservice.service.ITParkValueRecordService;
import cn.tnar.parkservice.util.RedisUtil;
import cn.tnar.pms.kesb.KesbClient;
import cn.tnar.pms.kesb.KesbParam;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.stream.Collectors;

import static cn.tnar.parkservice.util.DateUtils.*;

/**
 * @author dzx
 * @ClassName:
 * @Description:
 * @date 2019年09月10日 11:10:56
 */
@Component
@Slf4j
public class TimingParkValueTask {

    @Autowired
    @Qualifier("tParkInfoServiceImpl")
    private ITParkInfoService itParkInfoService;

    @Autowired
    @Qualifier("tParkMemberGroupServiceImpl")
    private ITParkMemberGroupService itParkMemberGroupService;

    @Autowired
    @Qualifier("tCarFeesroleinfoServiceImpl")
    private ITCarFeesroleinfoService itCarFeesroleinfoService;

    @Autowired
    @Qualifier("tParkValueRecordServiceImpl")
    private ITParkValueRecordService itParkValueRecordService;

    @Autowired
    private KesbClient kesbClient;

    @Autowired
    private RedisUtil redisUtil;

    /**
     * 下划线常量
     */
    private static final String UNDERLINE = "_";

    /**
     * 处理任务批次阈值，当达到100个子卡时开始执行计算停车价值记录
     */
    private static final int batchJobCount = 100;

    /**
     * 控制允许同一时间段内的最大跑批次线程数
     */
    private static final int threadTotal = 200;
    private static final Semaphore SEMAPHORE = new Semaphore(threadTotal);

    /**
     * 缓存线程池，核心池为0，最大线程数Integer.MAX_VALUE = 0x7fffffff
     */
    private static ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    /**
     * 当任务完成之后，控制线程池的关闭
     */
    private static CountDownLatch COUNT_DOWN_LATCH = null;


    //    @Scheduled(cron = "0/5 * * * * ?")
    @Scheduled(cron = "0 0 1 * * ?")
    public void processParkValue() {
        log.info("开始执行定时任务");
        if (EXECUTOR_SERVICE.isShutdown()) {
            EXECUTOR_SERVICE = Executors.newCachedThreadPool();
        }
        //获取所有的停车场
        List<TParkInfo> parkInfoList = itParkInfoService.list();
        if (!CollectionUtils.isEmpty(parkInfoList)) {

            log.info("本次需要处理停车价值的停车场数量为【{}】", parkInfoList.size());

            String parkCodeIn = parkInfoList.parallelStream()
                    .filter(x -> StringUtils.isNotBlank(x.getParkCode()))
                    .map(x -> "'" + x.getParkCode() + "'")
                    .distinct()
                    .collect(Collectors.joining(","));
            //根据停车场编号 和 classify 以及 type 为 1 的子卡
            QueryWrapper<TParkMemberGroup> queryWrapper = new QueryWrapper<TParkMemberGroup>();

            queryWrapper.inSql("park_code", parkCodeIn)
                    .eq("type", 1) //子卡
                    .in("classify", 2, 5)
                    .isNotNull("fee_index_id")
                    .isNotNull("feesroleinfo_id"); //访客证或者自定义卡证
            List<TParkMemberGroup> list = itParkMemberGroupService.list(queryWrapper);
            if (!CollectionUtils.isEmpty(list)) {

                log.info("本次需要处理停车价值的卡证总数量为【{}】", list.size());

                //获取前一天的日期
                Date preDate = DateUtils.addDays(new Date(), -1);
                String preDateStr = getTimeString(YYYYMMDD, preDate);

                log.info("本次计算停车价值记录的日期【{}】", preDateStr);

                int totalCount = (int) Math.ceil((double) list.size() / (double) batchJobCount);

                log.info("本次计算停车价值共需【{}】批次", totalCount);

                COUNT_DOWN_LATCH = new CountDownLatch(totalCount);
                List<TParkMemberGroup> tempList = new ArrayList<TParkMemberGroup>();
                for (TParkMemberGroup group : list) {
                    tempList.add(group);
                    //如果满足100个卡证，则执行一次批次任务
                    if (tempList.size() >= batchJobCount) {
                        EXECUTOR_SERVICE.execute(new JobThread(new ArrayList<TParkMemberGroup>(tempList), preDateStr));
                        tempList.clear();
                    }
                }
                //剩余的不足100个卡证，也执行一次批次任务
                if (tempList.size() > 0) {
                    EXECUTOR_SERVICE.execute(new JobThread(new ArrayList<TParkMemberGroup>(tempList), preDateStr));
                    tempList.clear();
                }
            }

        } else {
            log.debug("暂无停车场价值记录需要处理");
        }
        try {
            //等待两小时超时时间已过或者锁存器倒数到零，自动关闭线程池
            if (COUNT_DOWN_LATCH != null) {
                COUNT_DOWN_LATCH.await(23, TimeUnit.HOURS);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //未执行完的任务继续执行，不再接收新的任务
        EXECUTOR_SERVICE.shutdown();
    }


    class JobThread implements Runnable {
        private List<TParkMemberGroup> memberGroupList;

        private String preDate; //前一天的日期

        public JobThread(List<TParkMemberGroup> memberGroupList, String preDate) {
            this.memberGroupList = memberGroupList;
            this.preDate = preDate;
        }

        @Override
        public void run() {
            try {
                SEMAPHORE.acquire(1);

                for (TParkMemberGroup group : memberGroupList) {
                    BigDecimal parkValue = new BigDecimal(0);//停车价值
                    BigDecimal reduceMoney = new BigDecimal(0);//优免金额
                    Long countMember = 0L; //会员数
                    BigDecimal feeMoney = new BigDecimal(0); //套餐总金额
                    Long countTime = 0L; //总时长
                    Long countNumber = 0L; //总笔数


                    //根据 子卡id_日期 的key 获取 redis 中的 当日流水记录
                    Long id = group.getId();
                    String parkCode = group.getParkCode();
                    Long feeIndexId = group.getFeeIndexId();
                    Long feesroleinfoId = group.getFeesroleinfoId();
                    TCarFeesroleinfo feesroleinfo = itCarFeesroleinfoService.getById(feesroleinfoId);
                    BigDecimal monthValue = feesroleinfo.getMonthValue();

                    String KEY = id + UNDERLINE + preDate;
                    long llen = redisUtil.llen(KEY);
                    if (llen > 0) {
                        List<String> parkingRecordJsonList = redisUtil.lrange(KEY, 0, llen - 1);
                        List<TParkDayParkingRecord> parkingRecordList = parkingRecordJsonList
                                .parallelStream().filter(StringUtils::isNotBlank)
                                .map(x -> JSON.parseObject(x, TParkDayParkingRecord.class))
                                .collect(Collectors.toList());

                        for (TParkDayParkingRecord parkingRecord : parkingRecordList) {
                            long intime = parkingRecord.getIntime(); //进场时间
                            long outtime = parkingRecord.getOuttime();//出场时间
                            long cartype = parkingRecord.getCartype();//车辆类型
                            double parkamt = parkingRecord.getParkamt();
                            //根据流水调用试算c接口获取停车价值
                            KesbParam kesbParam = new KesbParam()
                                    .kput("index_id", feeIndexId)
                                    .kput("car_type", cartype)
                                    .kput("start_time", intime)
                                    .kput("end_time", outtime).kput("user_type", 0);
                            //计算停车价值
                            parkValue = parkValue.add(tryCalc(kesbParam));
                            //计算优免金额
                            reduceMoney = reduceMoney.add(new BigDecimal(parkamt));
                            //计算总时长
                            Date intimeDate = parse(YYYYMMDDHHMMSS, String.valueOf(intime));
                            Date outtimeDate = parse(YYYYMMDDHHMMSS, String.valueOf(outtime));
                            long mill = outtimeDate.getTime() - intimeDate.getTime();
                            countTime += TimeUnit.MILLISECONDS.toHours(mill);
                        }

                        //会员数 = 停车流水中的车牌号去重得到会员数
                        countMember = parkingRecordList.parallelStream()
                                .map(x -> x.getCarId())
                                .distinct().count();
                        //套餐总金额 = 会员数 * 套餐标准
                        feeMoney = new BigDecimal(countMember)
                                .multiply(monthValue);
                        //总比数 = 流水的总条数
                        countNumber = (long) parkingRecordList.size();

                        TParkValueRecord tParkValueRecord = new TParkValueRecord()
                                .setParkCode(parkCode)
                                .setParkValue(parkValue)
                                .setCurTime(Long.valueOf(preDate))
                                .setGroupId(id)
                                .setReduceMoney(reduceMoney)
                                .setFeeMoney(feeMoney)
                                .setCountTime(countTime)
                                .setCountNumber(countNumber)
                                .setCountMember(countMember);
                        itParkValueRecordService.save(tParkValueRecord);
                    }
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                SEMAPHORE.release(1);
                COUNT_DOWN_LATCH.countDown();
            }
        }


        /**
         * 接口号：85102015
         * "index_id":"120982",
         * "car_type":"2",
         * "start_time":"20190822093556",
         * "end_time":"20190823093558",
         * "user_type":"0"
         *
         * @param req
         * @return
         */
        public BigDecimal tryCalc(KesbParam req) {
            BigDecimal result = new BigDecimal(0);
            try {
                Map<String, String> request = kesbClient.request(Constant.C_PARK_FEE_RULE_TEST, req);
                if (request != null && request.size() > 0) {
                    result = new BigDecimal(request.get("money"));
                }
                return result;
            } catch (Exception e) {
                log.error("error", e);
            }
            return result;
        }
    }
}
